package com.iteaj.iot.server.handle;

import com.iteaj.iot.*;
import com.iteaj.iot.codec.filter.CombinedInterceptor;
import com.iteaj.iot.codec.filter.RegisterParams;
import com.iteaj.iot.config.ConnectProperties;
import com.iteaj.iot.event.ClientStatus;
import com.iteaj.iot.event.StatusEvent;
import com.iteaj.iot.event.OfflineReason;
import com.iteaj.iot.message.UnParseBodyMessage;
import com.iteaj.iot.server.ServerMessage;
import com.iteaj.iot.server.SocketServerComponent;
import com.iteaj.iot.server.TcpServerComponent;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.Attribute;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

/**
 * <p>事件管理处理器</p>
 * 用来管理平台上的各类事件(TCP协议)
 * @author iteaj
 * @since 1.8
 */
@ChannelHandler.Sharable
public class EventManagerHandler extends SimpleChannelInboundHandler<UnParseBodyMessage> {

    private static EventManagerHandler managerHandler = new EventManagerHandler();
    private Logger logger = LoggerFactory.getLogger(getClass());

    public static EventManagerHandler getInstance() {
        return managerHandler;
    }

    /**
     * 处理心跳事件
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){ //心跳事件
            //获取设备编号
            String equipCode = ctx.channel().attr(CoreConst.EQUIP_CODE).get();
            TcpServerComponent component = (TcpServerComponent) ctx.channel().attr(CoreConst.COMPONENT).get();
            Object idle = component.idle(equipCode, ((IdleStateEvent) evt).state());
            if(idle instanceof IdleState) { // 返回超时状态直接关闭连接
                ConnectProperties config = component.config();
                long timeout = idle == IdleState.ALL_IDLE ? config.getAllIdleTime() :
                        idle == IdleState.READER_IDLE ? config.getReaderIdleTime() :
                                config.getWriterIdleTime();

                // 声明连接关闭是因为客户端超时导致
                ctx.channel().attr(CoreConst.CLIENT_TIMEOUT_CLOSED).set(timeout);
                ctx.channel().close();
            } else if(idle != null){ // 返回值不为空则直接写出
                component.writeAndFlush(equipCode, idle);
            } else {
                // 什么都不做
            }
        }

        ctx.fireUserEventTriggered(evt);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, UnParseBodyMessage msg) throws Exception {
        if(msg.getHead() == null) {
            logger.error("未构建出报文头[{}#doBuild(byte[])]", msg.getClass().getSimpleName());
            return;
        }

        //获取设备编号
        String equipCode = ctx.channel().attr(CoreConst.EQUIP_CODE).get();
        if(null == equipCode) { // 设备编号还没有注册则注册
            SocketServerComponent component = FrameworkManager
                    .getServerComponent((Class<? extends ServerMessage>) msg.getClass());

            ChannelManager deviceManager = (ChannelManager)component.getDeviceManager();

            // 先注册设备编号
            Message.MessageHead messageHead = component.register(msg.getHead()
                    , new RegisterParams(msg, ctx.channel(), component));

            equipCode = messageHead.getEquipCode();
            if(equipCode != null) {
                StatusEvent statusEvent = null;
                synchronized (component) { //todo 是否去掉同步处理
                    // 注册设备编号到对应的Channel
                    Object ifAbsent = ctx.channel().attr(CoreConst.EQUIP_CODE).setIfAbsent(equipCode);
                    if(ifAbsent == null) { // 注册成功
                        Channel channel = deviceManager.find(equipCode);
                        // 出现同一个编号两个连接再用的情况
                        if(channel != null && channel != ctx.channel()) {
                            if(channel.isActive()) { // 上一个还未关闭连接
                                if(component.isOverride()) { // 组件已设置了可以覆写上一个连接
                                    deviceManager.remove(equipCode); // 移除旧的连接
                                    deviceManager.add(equipCode, ctx.channel()); // 使用新连接
                                    channel.attr(CoreConst.CLIENT_OVERRIDE_CLOSED).set(Boolean.TRUE); // 标记此连接是因重复而被关闭
                                    // https://gitee.com/iteaj/iot/issues/I7V94U
                                    // channel.close(); // 此处不在关闭, 等待连接超时自动关闭
                                }
                            } else { // 连接已经关闭, 直接覆盖
                                deviceManager.remove(equipCode);
                                deviceManager.add(equipCode, ctx.channel());
                            }

                            if(logger.isDebugEnabled()) {
                                logger.warn("客户端冲突({}) - 客户端编号: {}", component.getName(), messageHead.getEquipCode());
                            }
                        } else {
                            deviceManager.add(equipCode, ctx.channel());
                        }

                        statusEvent = new StatusEvent(equipCode, ClientStatus.online, component);
                        if(logger.isDebugEnabled()) {
                            logger.debug("客户端上线({}) - 客户端编号: {} - 客户端地址: {}", component.getName(), equipCode, ctx.channel().remoteAddress());
                        }
                    } else {
                        logger.warn("客户端注册失败({}) - 客户端编号: {} - 客户端地址: {}", component.getName(), equipCode, ctx.channel().remoteAddress());
                    }
                }

                if(statusEvent != null) {
                    try {
                        //触发设备上线事件
                        FrameworkManager.publishEvent(new StatusEvent(equipCode, ClientStatus.online, component));
                    } catch (Exception e) {
                        logger.error("事件发布失败({}) {}", component.getName(), e.getCause());
                    }
                }
            } else {
                logger.error("注册设备编号({}) 报文头未设置设备编号导致设备注册失败 - 客户端地址: {}", component.getName(), ctx.channel().remoteAddress()); return;
            }
        } else {
            // 在注册设备编号之后, 如果后面的交互没有指定设备编号则使用第一次注册时的设备编号
            if(msg.getHead().getEquipCode() == null) {
                msg.getHead().setEquipCode(equipCode); // 设置设备编号
            }
        }

        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
        SocketServerComponent component = FrameworkManager.getTcpComponent(localAddress.getPort());
        if(!component.isActivation(channel, component)) {
            channel.close();
        } else {
            ChannelManager deviceManager = (ChannelManager) component.getDeviceManager();
            deviceManager.add(channel);

            // 设置客户端上线时间
            channel.attr(CoreConst.CLIENT_ONLINE_TIME).set(System.currentTimeMillis());

            if(logger.isTraceEnabled()) {
                logger.trace("客户端激活 客户端地址：{}", channel.remoteAddress());
            }

            super.channelActive(ctx);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        try {

            Attribute attribute = ctx.channel().attr(CoreConst.EQUIP_CODE);
            InetSocketAddress address = (InetSocketAddress)ctx.channel().localAddress();
            SocketServerComponent serverComponent = FrameworkManager.getTcpComponent(address.getPort());

            Object equipCode = attribute.get();
            if(equipCode != null && logger.isWarnEnabled()) {
                //触发设备下线事件
                InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
                // 设备因为连接重复从而导致的关闭
                Boolean aBoolean = ctx.channel().attr(CoreConst.CLIENT_OVERRIDE_CLOSED).get();
                if(Boolean.TRUE.equals(aBoolean)) {
                    if(logger.isWarnEnabled()) {
                        logger.warn("客户端断线({}) 客户端已存在 - 客户端编号: {} - 客户端地址: {}", serverComponent.getName(), equipCode, remoteAddress);
                    }
                } else {
                    // 设备因为读写超时导致的关闭
                    Long timeout = ctx.channel().attr(CoreConst.CLIENT_TIMEOUT_CLOSED).get();
                    StatusEvent clientStatusEvent = new StatusEvent(equipCode, ClientStatus.offline, serverComponent);
                    if(timeout != null) {
                        clientStatusEvent.setReason(OfflineReason.timeout);
                    }

                    FrameworkManager.publishEvent(clientStatusEvent);
                    if(logger.isWarnEnabled()) {
                        if(timeout != null) {
                             logger.warn("客户端断线({}) 读写超时({}s) - 客户端编号: {} - 客户端地址: {}"
                                    , serverComponent.getName(), timeout, equipCode, remoteAddress);
                        } else {
                            logger.warn("客户端断线({}) 连接关闭 - 客户端编号: {} - 客户端地址: {}", serverComponent.getName(), equipCode, remoteAddress);
                        }
                    }
                }

            } else if(logger.isWarnEnabled()) {
                InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
                if(remoteAddress != null && serverComponent != null) {
                    // 一般没有设备编号也不会保存到设备管理器
                    logger.warn("客户端断线({}) 客户端异常 - 客户端编号: 未注册 - 客户端地址: {}", serverComponent.getName(), remoteAddress);
                }
            }

        } finally {
            super.channelInactive(ctx);
        }

    }
}
